Spark SQL是如何选择join策略的?
前言
join判断条件
build table侧的选择
private def canBuildRight(joinType: JoinType): Boolean = joinType match {
case _: InnerLike | LeftOuter | LeftSemi | LeftAnti | _: ExistenceJoin => true
case _ => false
}
private def canBuildLeft(joinType: JoinType): Boolean = joinType match {
case _: InnerLike | RightOuter => true
case _ => false
}
表如何被广播
spark.sql.autoBroadcastJoinThreshold
参数规定的值(默认值是10MB,可修改),那么它会被自动广播出去。对应代码如下。 private def canBroadcast(plan: LogicalPlan): Boolean = {
plan.stats.sizeInBytes >= 0 && plan.stats.sizeInBytes <= conf.autoBroadcastJoinThreshold
}
private def canBroadcastBySizes(joinType: JoinType, left: LogicalPlan, right: LogicalPlan)
: Boolean = {
val buildLeft = canBuildLeft(joinType) && canBroadcast(left)
val buildRight = canBuildRight(joinType) && canBroadcast(right)
buildLeft || buildRight
}
除了上述阈值之外,Spark SQL还允许在语句里使用broadcast hint(即/* +BROADCAST(t) */
)来手动指定要广播的表,判断逻辑如下所示。
private def canBroadcastByHints(joinType: JoinType, left: LogicalPlan, right: LogicalPlan)
: Boolean = {
val buildLeft = canBuildLeft(joinType) && left.stats.hints.broadcast
val buildRight = canBuildRight(joinType) && right.stats.hints.broadcast
buildLeft || buildRight
}
根据阈值和根据hint广播表的方法如下,逻辑比较简单。
private def broadcastSideBySizes(joinType: JoinType, left: LogicalPlan, right: LogicalPlan)
: BuildSide = {
val buildLeft = canBuildLeft(joinType) && canBroadcast(left)
val buildRight = canBuildRight(joinType) && canBroadcast(right)
broadcastSide(buildLeft, buildRight, left, right)
}
private def broadcastSideByHints(joinType: JoinType, left: LogicalPlan, right: LogicalPlan)
: BuildSide = {
val buildLeft = canBuildLeft(joinType) && left.stats.hints.broadcast
val buildRight = canBuildRight(joinType) && right.stats.hints.broadcast
broadcastSide(buildLeft, buildRight, left, right)
}
这两个方法最终都调用了broadcastSide()方法确定应该广播哪个表。
private def broadcastSide(
canBuildLeft: Boolean,
canBuildRight: Boolean,
left: LogicalPlan,
right: LogicalPlan): BuildSide = {
def smallerSide =
if (right.stats.sizeInBytes <= left.stats.sizeInBytes) BuildRight else BuildLeft
if (canBuildRight && canBuildLeft) {
// Broadcast smaller side base on its estimated physical size
// if both sides have broadcast hint
smallerSide
} else if (canBuildRight) {
BuildRight
} else if (canBuildLeft) {
BuildLeft
} else {
// for the last default broadcast nested loop join
smallerSide
}
}
该方法先根据表的统计信息找出左表和右表中size较小的那个,如果左表和右表都能或者都不能作为build table,就将较小的表广播。否则,先判断右表是否可作为build table,可行的话优先广播右表,再判断左表。可见,broadcast hint只能表示用户广播表的偏好,实际执行时未必会按照broadcast hint指定的表来。
是否可构造本地HashMap
Shuffle hash join过程中,如果数据量不大,就可以用本地哈希表保存Shuffle中间结果,提高效率。当逻辑计划的数据量小于广播阈值与Shuffle分区数的乘积,即小于spark.sql.autoBroadcastJoinThreshold * spark.sql.shuffle.partitions
时,说明单个分区的数据量足够小,可以安全地构造本地HashMap。
private def canBuildLocalHashMap(plan: LogicalPlan): Boolean = {
plan.stats.sizeInBytes < conf.autoBroadcastJoinThreshold * conf.numShufflePartitions
}
join策略选择
这部分源码都位于JoinSelection对象的apply()方法中。重要的话再说一次,策略的选择会按照效率从高到低的优先级来排。
Broadcast hash join
// broadcast hints were specified
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if canBroadcastByHints(joinType, left, right) =>
val buildSide = broadcastSideByHints(joinType, left, right)
Seq(joins.BroadcastHashJoinExec(
leftKeys, rightKeys, joinType, buildSide, condition, planLater(left), planLater(right)))
// broadcast hints were not specified, so need to infer it from size and configuration.
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if canBroadcastBySizes(joinType, left, right) =>
val buildSide = broadcastSideBySizes(joinType, left, right)
Seq(joins.BroadcastHashJoinExec(
leftKeys, rightKeys, joinType, buildSide, condition, planLater(left), planLater(right)))
可见是先根据broadcast hint来判断,其次是广播阈值。判断的条件在上一节已经说完了哈。
Shuffle hash join
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if !conf.preferSortMergeJoin && canBuildRight(joinType) && canBuildLocalHashMap(right)
&& muchSmaller(right, left) ||
!RowOrdering.isOrderable(leftKeys) =>
Seq(joins.ShuffledHashJoinExec(
leftKeys, rightKeys, joinType, BuildRight, condition, planLater(left), planLater(right)))
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if !conf.preferSortMergeJoin && canBuildLeft(joinType) && canBuildLocalHashMap(left)
&& muchSmaller(left, right) ||
!RowOrdering.isOrderable(leftKeys) =>
Seq(joins.ShuffledHashJoinExec(
leftKeys, rightKeys, joinType, BuildLeft, condition, planLater(left), planLater(right)))
选择Shuffle hash join策略的条件比较严苛,大前提是不优先采用Sort merge join,即spark.sql.join.preferSortMergeJoin
配置项为false。与Broadcast hash join相同的,Shuffle hash join也是先检查右表,后检查左表。以右表为例,还需要满足以下3个条件:
右表能够作为build table;
能够从右表构建本地HashMap;
右表的数据量比左表小很多(即muchSmaller()方法),“很多”在代码中规定为3倍。
除去上述情况外,如果参与join的表的key无法被排序(即根本无法使用Sort merge join),那么也会fallback到Shuffle hash join策略。
Sort merge join
case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right)
if RowOrdering.isOrderable(leftKeys) =>
joins.SortMergeJoinExec(
leftKeys, rightKeys, joinType, condition, planLater(left), planLater(right)) :: Nil
如果上面两种策略都不符合,并且参与join的key是可以排序的话,就会采取Sort merge join。这个要求不高,所以Spark SQL中非小表的join都会采用此策略。
Non equi-join
// Pick BroadcastNestedLoopJoin if one side could be broadcast
case j @ logical.Join(left, right, joinType, condition)
if canBroadcastByHints(joinType, left, right) =>
val buildSide = broadcastSideByHints(joinType, left, right)
joins.BroadcastNestedLoopJoinExec(
planLater(left), planLater(right), buildSide, joinType, condition) :: Nil
case j @ logical.Join(left, right, joinType, condition)
if canBroadcastBySizes(joinType, left, right) =>
val buildSide = broadcastSideBySizes(joinType, left, right)
joins.BroadcastNestedLoopJoinExec(
planLater(left), planLater(right), buildSide, joinType, condition) :: Nil
// Pick CartesianProduct for InnerJoin
case logical.Join(left, right, _: InnerLike, condition) =>
joins.CartesianProductExec(planLater(left), planLater(right), condition) :: Nil
case logical.Join(left, right, joinType, condition) =>
val buildSide = broadcastSide(
left.stats.hints.broadcast, right.stats.hints.broadcast, left, right)
// This join could be very slow or OOM
joins.BroadcastNestedLoopJoinExec(
planLater(left), planLater(right), buildSide, joinType, condition) :: Nil
文章不错?点个【在看】吧! 👇